package com.fintech.pangu.elasticsearch.service.impl;

import com.fintech.pangu.elasticsearch.service.BulkProcessorService;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;

import java.util.List;

/**
 * ElasticSearch批量处理服务接口实现类
 *
 * @author chendongdong
 * @date 2019/04/13
 **/
public class BulkProcessorServiceImpl implements BulkProcessorService {

    /**
     * 批量执行进程对象
     */
    private BulkProcessor bulkProcessor;

    public BulkProcessorServiceImpl(BulkProcessor bulkProcessor) {
        this.bulkProcessor = bulkProcessor;
    }

    /**
     * 批量新增操作
     *
     * @param insertRequestList 批量新增请求列表
     **/
    @Override
    public void bulkInsertProcessor(List<IndexRequest> insertRequestList) {
        insertRequestList.stream().forEach(insertRequest -> bulkProcessor.add(insertRequest));
    }

    /**
     * 批量更新操作
     *
     * @param updateRequestList 批量更新请求列表
     **/
    @Override
    public void bulkUpdateProcessor(List<UpdateRequest> updateRequestList) {
        //添加单次请求
        updateRequestList.stream().forEach(updateRequest -> bulkProcessor.add(updateRequest));
    }
}
